Managing Worker Threads
Delaying Availability of Workers
A worker thread will not be made available to process tasks until Piscina determines that it is "ready". By default, a worker is ready as soon as Piscina loads it and acquires a reference to the exported handler function.
There may be times when the availability of a worker may need to be delayed
longer while the worker initializes any resources it may need to operate.
To support this case, the worker module may export a Promise
that resolves
the handler function as opposed to exporting the function directly:
- Javascript
- Typescript
async function initialize() {
await someAsyncInitializationActivity();
return ({ a, b }) => a + b;
}
module.exports = initialize();
interface OperationInput {
a: number;
b: number;
}
export async function initialize(): Promise<(input: OperationInput) => number> {
await someAsyncInitializationActivity();
return ({ a, b }) => a + b;
}
Piscina will await the resolution of the exported Promise before marking the worker thread available.
Managing Task Overload with Backpressure
When the maxQueue
option is set, once the Piscina
queue is full, no
additional tasks may be submitted until the queue size falls below the
limit. The 'drain'
event may be used to receive notification when the
queue is empty and all tasks have been submitted to workers for processing.
The example below shows how to use a Node.js stream to feed a Piscina worker pool:
- Javascript
- Typescript
"use strict";
const { resolve } = require("path");
const Pool = require("piscina");
const pool = new Pool({
filename: resolve(__dirname, "worker.js"),
maxQueue: "auto",
});
const stream = getStreamSomehow();
stream.setEncoding("utf8");
pool.on("drain", () => {
if (stream.isPaused()) {
console.log("resuming...", counter, pool.queueSize);
stream.resume();
}
});
stream
.on("data", (data) => {
pool.run(data);
if (pool.queueSize === pool.options.maxQueue) {
console.log("pausing...", counter, pool.queueSize);
stream.pause();
}
})
.on("error", console.error)
.on("end", () => {
console.log("done");
});
import Pool from "piscina";
import { resolve } from "path";
import { filename } from "./worker.js";
const pool = new Pool({
filename: resolve(__dirname, "./workerWrapper.js"),
workerData: { fullpath: filename },
maxQueue: "auto",
});
const stream = getStreamSomehow();
stream.setEncoding("utf8");
let counter = 0;
pool.on("drain", () => {
if (stream.isPaused()) {
console.log("resuming...", counter, pool.queueSize);
stream.resume();
}
});
stream
.on("data", (data: string) => {
pool.run(data);
if (pool.queueSize === pool.options.maxQueue) {
console.log("pausing...", counter, pool.queueSize);
stream.pause();
}
})
.on("error", console.error)
.on("end", () => {
console.log("done");
});
const { workerData } = require('worker_threads');
if (workerData.fullpath.endsWith(".ts")) {
require("ts-node").register();
}
module.exports = require(workerData.fullpath);
Out of Scope Asynchronous Code
A worker thread is only active until the moment it returns a result, it can be a result of a synchronous call or a Promise that will be fulfilled/rejected in the future. Once this is done, Piscina will wait for stdout
and stderr
to be flushed, and then pause the worker's event-loop until the next call. If async code is scheduled without being awaited before returning since Piscina has no way of detecting this, that code execution will be resumed on the next call. Thus, it is highly recommended to properly handle all async tasks before returning a result as it could make your code unpredictable.
For example:
- Javascript
- Typescript
const { setTimeout } = require("timers/promises");
module.exports = ({ a, b }) => {
// This promise should be awaited
setTimeout(1000).then(() => {
console.log("Working"); // This will **not** run during the same worker call
});
return a + b;
};
import { setTimeout } from "timers/promises";
interface OperationInput {
a: number;
b: number;
}
export async ({ a, b }: OperationInput): Promise<number> => {
// This promise should be awaited
await setTimeout(1000);
console.log("Working"); // This will **not** run during the same worker call
return a + b;
};